package org.example.dobs.demo.flink.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.StringTokenizer;

/**
 * （1）字段过多时，建WordAndCount简化Tuple引用
 * （2）逻辑复杂时，把算子抽象成类
 */
public class WC_V3 {
    public static void main(String[] args) throws Exception {
        //step1 environment of flink
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        //step2 input
        DataStreamSource<String> inputDataStream = see.socketTextStream("localhost", 9999); // nc -lk 9999
        //step3 data processing. flatMap keyBy
        SingleOutputStreamOperator<WordAndCount> result = inputDataStream.flatMap(new SplitLine())
                .keyBy("word")
                .sum("count");
        // step4 output
        result.print();
        // finally run or execute flink job
        see.execute("WordCount_Job");
    }

    public static class WordAndCount {
        private String word;
        private int count;

        public WordAndCount() {
        }

        public WordAndCount(String word, int count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordAndCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }
    }


    public static class SplitLine implements FlatMapFunction<String, WordAndCount> {

        @Override
        public void flatMap(String line, Collector<WordAndCount> out) throws Exception {
            StringTokenizer stringTokenizer = new StringTokenizer(line);
            while (stringTokenizer.hasMoreTokens()) {
                String token = stringTokenizer.nextToken();
                out.collect(new WordAndCount(token, 1));
            }
        }
    }
}
